 1.数据采集之数据说明
   
   智慧物流项目挑选出两个重要的场景，一个仓储预测，另一个是车货匹配。
   其中仓储预测使用机器学习算法LightGBM;智能调度则使用动态规划算法。
   算法开发的一般流程：
       机器学习算法开发流程
	   1).准备数据、公司本身数据、购买数据
	   2).明确问题
       建立算法数据：根据数据类型划分应用种类
	   3).数据基本处理
       pd去处理数据(缺失值，合并表....)
	   4).特征工程
       特征进行处理(训练集，测试集、验证集)
	   5).找寻合适的算法进行分析
       (1)、估计器选择
       (2)、调用fit(x_train,y_train)
       (3)、调用 a)预测：y_predict=predict(x_test) b)预测的准确率：score(x_test,y_test)
	   6).模型的评估 ---->评估不合格，则考虑：1、换算法 2、 调参数 3、 特征工程再进一步处理
	   7).模型实现预测，以API形式提供，或者直接提供计算结果数据
	   传统算法开发流程
       传统算法的开发流程相对于机器学习算法开发流程更简单，只保留前几个部分即可。
   不管使用何种算法来解决问题，重要的都是我们能拿到的数据，只有充分利用现有的数据进行数据分析
和处理，选择合适的算法才能获取到比较好的结果。在一个成熟的企业中对于算法的选择和实现以及评价
调优都是由专门的算法工程师或者机器学习工程师来负责。我们大数据部门工程师主要负责提供满足算法
部门要求的数据。我们以仓储预测场景来梳理一个算法的开发流程。
   Mysql数据库：lg_orders(订单表),lg_items(商品表)，lg_item_cats(商品分类表),lg_enterports(仓库表)
   数据采集：使用Sqoop把数据同步到Hive数仓中
   数仓ETL：对数据统一化，清洗，预处理
   数据特征工程：Python/Scala
   训练模型：SparkMlib
   模型预测：SparkMlib
   模型评价：损失函数
   模型优化：调整    

 2.业务数据
   
   根据算法部门的要求，仓储预测模型需要提供4类数据
   sales_train;items;item_categories;entreports
       sales_train
       表示的是销售数据，有日期，月份,仓库，商品，价格和日销售量
   统计出的是每个仓库每个商品的日销量数据，可以来自于订单表(lg_orders);同步到数仓中然后统计出
指标数据；数据库是mysql；
       items
       表示的是商品信息，有商品名称，商品id,商品分类id
       可以来自商品表，数据库：mysql
	   entreports
       仓库数据集，
       仓库id,仓库名称(地址信息)
       数据库：mysql
	   item_category
       来源商品分类表，其中有商品分类id以及分类的名称
       数据来自表，来源是mysql.
   
   选择hadoop1上的Mysql模拟业务数据库
   创建lg_lgstic数据库
   mysql> create database lg_logstic;
Query OK, 1 row affected (0.01 sec)
   mysql> use lg_logstic;
Query OK, 1 row affected (0.01 sec)
#上传sql脚本文件到root/mysql_logstic，source执行
   mysql> source /root/mysql_logstic/lg_logstic.sql;
Query OK, 1 row affected (0.01 sec)

 3.同步数据到Hive
   
   数仓分层实现
   数仓主要分为四层：
       ODS
       DWD
       DWS
       ADS
   在Hive中分别创建四个数据库对应数仓的四层
   create database lg_ods;
   create database lg_dwd;
   create database lg_dws;
   create database lg_ads;
   
   对于数据采集来说分为两部分，
       一部分是离线数据采集，主要是Mysql中相关业务数据的采集，
       一部分是实时数据采集，主要是车辆行驶相关数据采集。
   
   1).离线数据采集
   对于以上业务数据的采集来说，继续沿用离线数仓的数据采集方式，也就是针对不同的表类型选择不
同的同步方式。
   共有6张表
   lg_orders,
   lg_order_entrepot,
   lg_order_items,
   lg_items,
   lg_entrepots,
   lg_item_cats
   
   事实表：lg_orders,lg_order_items，lg_order_entrepot
   维度表：lg_items,lg_entrepots,lg_item_cats
   对于lg_orders,lg_items,lg_entrepots,lg_item_cats在数仓中主要以拉链表方式保存；
   Hive创建ODS层表
   -- 创建ODS层商品分类表
drop table if exists `lg_ods`.`lg_item_cats`;
create table `lg_ods`.`lg_item_cats`(
catId bigint,
parentId bigint,
catName string,
isShow bigint,
isFloor bigint,
catSort bigint,
dataFlag bigint,
createTime string,
commissionRate double,
catImg string,
subTitle string,
simpleName string,
seoTitle string,
seoKeywords string,
seoDes string,
catListTheme string,
detailTheme string,
mobileCatListTheme string,
mobileDetailTheme string,
wechatCatListTheme string,
wechatDetailTheme string,
cat_level bigint,
modifyTime string
)

partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;

-- 创建ODS层商品表
drop table if exists `lg_ods`.`lg_items`;
create table `lg_ods`.`lg_items`(
itemsId bigint,
itemsSn string,
productNo string,
itemsName string,
itemsImg string,
entrepotId bigint,
itemsType bigint,
marketPrice double,
entrepotPrice double,
warnStock bigint,
itemsStock bigint,
itemsUnit string,
itemsTips string,
isSale bigint,
isBest bigint,
isHot bigint,
isNew bigint,
isRecom bigint,
itemsCatIdPath string,
itemsCatId bigint,
entrepotCatId1 bigint,
entrepotCatId2 bigint,
brandId bigint,
itemsDesc string,
itemsStatus bigint,
saleNum bigint,
saleTime string,
visitNum bigint,
appraiseNum bigint,
isSpec bigint,
gallery string,
itemsSeoKeywords string,
illegalRemarks string,
dataFlag bigint,
createTime string,
isFreeShipping bigint,
itemsSerachKeywords string,
modifyTime string
)
partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;

--创建ods层订单仓库关联表
drop table if exists `lg_ods`.`lg_order_entrepot`;
CREATE TABLE `lg_ods`.`lg_order_entrepot` (
oeId bigint, 
orderId bigint, 
itemId bigint, 
itemNums bigint, 
itemName string, 
entrepotId int, 
userName string,
userAddress string, 
promotionJson string,
createtime string,
modifyTime string 
) partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;

-- 创建ODS层仓库表
drop table if exists `lg_ods`.`lg_entrepots`;
CREATE TABLE `lg_ods`.`lg_entrepots` (
entrepotId bigint,
areaId bigint,
entrepotName string,
entrepotkeeper string,
telephone string,
entrepotImg string,
entrepotTel string,
entrepotQQ string,
entrepotAddress string,
invoiceRemarks string,
serviceStartTime bigint,
serviceEndTime bigint,
freight bigint,
entrepotAtive int,
entrepotStatus int,
statusDesc string,
dataFlag int,
createTime string ,
modifyTime string
) partitioned by (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- 创建ODS层订单表
drop table if exists `lg_ods`.`lg_orders`;
CREATE TABLE lg_ods.lg_orders (
orderId bigint,
orderNo string,
userId bigint,
orderStatus bigint,
itemsMoney double,
deliverType bigint,
deliverMoney double,
totalMoney double,
realTotalMoney double,
payType bigint,
isPay bigint,
areaId bigint,
userAddressId bigint,
areaIdPath string,
userName string,
userAddress string,
userPhone string,
orderScore bigint,
isInvoice bigint,
invoiceClient string,
orderRemarks string,
orderSrc bigint,
needPay double,
payRand bigint,
orderType bigint,
isRefund bigint,
isAppraise bigint,
cancelReason bigint,
rejectReason bigint,
rejectOtherReason string,
isClosed bigint,
itemsSearchKeys string,
orderunique string,
isFromCart string,
receiveTime string,
deliveryTime string,
tradeNo string,
dataFlag bigint,
createTime string,
settlementId bigint,
commissionFee double,
scoreMoney double,
useScore bigint,
orderCode string,
extraJson string,
orderCodeTargetId bigint,
noticeDeliver bigint,
invoiceJson string,
lockCashMoney double,
payTime string,
isBatch bigint,
totalPayFee bigint,
modifyTime string
) partitioned by (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

-- 创建ODS层订单明细表
drop table if exists `lg_ods`.`lg_order_items`;
create table `lg_ods`.`lg_order_items`(
ogId bigint,
orderId bigint,
itemsId bigint,
itemsNum bigint,
itemsPrice double,
payPrice double,
itemsSpecId bigint,
itemsSpecNames string,
itemsName string,
itemsImg string,
extraJson string,
itemsType bigint,
commissionRate double,
itemsCode string,
promotionJson string,
createtime string
)
partitioned by (dt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED
AS TEXTFILE;

   使用sqoop1实现采集；在CDH中其实建议使用sqoop1不建议使用sqoop2所以我们采用sqoop1实现
数据的采集！
   sqoop在导入数据时，可以使用--query搭配sql来指定查询条件，并且还需在sql中添加$CONDITIONS,
来实现并行运行mr的功能。
   如果使用--query但是不加$CONDITIONS会报错。如下
   ERROR tool.ImportTool: Import failed: java.io.IOException: Query [select * from
Person where score>50] must contain '$CONDITIONS' in WHERE clause.
   指定多个mapper导入数据；-m设置多个任务，比如 -m 10 注意必须加上-split-by参数用来指定如
何划分数据
   补充：sqoop会向关系型数据库例如mysql发送一个命令：select max(id),min(id) from test ;会把
max和min之间的区间平分10份，最后并行10个map去拉取数据。
   数据源：hadoop1的Mysql数据库
   目的地：Hive的ODS层表
   编写sqoop导入任务，从mysql中抽取数据到Hive中
   (1).第一次全量导入
   导入lg_orders表
   以lg_orders表为例：编写shell脚本
   import_order_data.sh
#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定义sqoop命令位置，Hive命令位置，在hadoop2
sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
Hive=/opt/cloudera/parcels/CDH/bin/hive
#定义工作日期
#do_date=20200827

#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 123456 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

# 全量导入订单数据方法
import_lg_orders(){
import_data lg_orders "select * from lg_orders where 1=1"
}
#调用全量导入订单数据方法
import_lg_orders

#注意sqoop导入数据的方式，对于Hive分区表来说需要执行添加分区操作，数据才能被识别到
$Hive -e "alter table lg_ods.lg_orders add partition(dt='$do_date');"

   对于其它表可以仿照上面的实现编写独立脚本，也可以全部综合到一起，如下
   import_all_data.sh

#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定义sqoop命令位置，Hive命令位置，在hadoop2
sqoop=/opt/lagou/servers/sqoop-1.4.7
Hive=/opt/lagou/servers/hive
#定义工作日期

#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
import_data(){
$sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

# 全量导入订单数据方法
import_lg_orders(){
import_data lg_orders "select * from lg_orders where 1=1"
}

# 全量导入订单明细数据(包含商品)方法
import_lg_order_items(){
import_data lg_order_items "select * from lg_order_items where 1=1"
}

# 全量导入商品方法
import_lg_items(){
import_data lg_items "select * from lg_items where 1=1"
}

# 全量导入仓库方法
import_lg_entrepots(){
import_data lg_entrepots "select * from lg_entrepots where 1=1"
}

# 全量导入商品分类数据方法
import_lg_item_cats(){
import_data lg_item_cats "select * from lg_item_cats where 1=1"
}

# 全量导入订单仓库关联数据方法
import_lg_order_entrepot(){
import_data lg_order_entrepot "select * from lg_order_entrepot where 1=1"
}

#调用全量导入订单数据方法
import_lg_orders
#调用全量导入订单明细数据方法
import_lg_order_items
#调用全量导入商品数据方法
import_lg_items
#调用全量导入仓库数据方法
import_lg_entrepots
#调用全量导入商品分类数据方法
import_lg_item_cats
#调用全量导入订单仓库关联数据方法
import_lg_order_entrepot

#注意sqoop导入数据的方式，对于Hive分区表来说需要执行添加分区操作，数据才能被识别到
$Hive -e "alter table lg_ods.lg_orders add partition(dt='$do_date');
alter table lg_ods.lg_order_items add partition(dt='$do_date');
alter table lg_ods.lg_items add partition(dt='$do_date');
alter table lg_ods.lg_entrepots add partition(dt='$do_date');
alter table lg_ods.lg_item_cats add partition(dt='$do_date');
alter table lg_ods.lg_order_entrepot add partition(dt='$do_date');"

   报错：缺少mysql驱动类
   cp mysql-connector-java.jar /opt/cloudera/parcels/CDH-5.14.0-
1.cdh5.14.0.p0.24/lib/sqoop/lib/mysql-connector.jar
   验证导入结果
   select * from lg_ods.lg_orders limit 5;
   select * from lg_ods.lg_order_items limit 5;
   select * from lg_ods.lg_items limit 5;
   select * from lg_ods.lg_entrepots limit 5;
   select * from lg_ods.lg_item_cats limit 5;
   select * from lg_ods.lg_order_entrepot limit 5;
   (2). 增量导入
   对于拉链表则ODS表需要每日拉取新增和更新的数据
   --抽取每日新增和更新的订单数据
SELECT *
FROM lg_orders
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}';

   shell脚本文件
   import_incr_data.sh
#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi

#sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
#Hive=/opt/cloudera/parcels/CDH/bin/hive

import_data(){
sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logstic \
--username root \
--password 123456 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

##导入新增订单数据的方法
import_lg_orders(){
import_data lg_orders "SELECT *
FROM lg_orders
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}' "
}
# 导入新增订单明细数据(包含商品)方法
import_lg_order_items(){
import_data lg_order_items "select
*   
from lg_order_items
WHERE DATE_FORMAT(createTime, '%Y%m%d') = '${do_date}'"
}
# 导入新增和变化商品方法
import_lg_items(){
import_data lg_items "select
*   
from lg_items
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}'"
}
# 导入新增和变化仓库方法
import_lg_entrepots(){
import_data lg_entrepots "select
*   
from lg_entrepots
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}'"
}
# 导入新增商品分类数据方法
import_lg_item_cats(){
import_data lg_item_cats "select
*   
from lg_item_cats
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}'"
}
# 导入新增订单仓库关联数据方法
import_lg_order_entrepot(){
import_data lg_order_entrepot "select
*   
from lg_order_entrepot
WHERE DATE_FORMAT(modifyTime, '%Y%m%d') = '${do_date}'"
}
#导入新增订单明细数据方法
import_lg_order_items
#调用导入新增商品数据方法
import_lg_items
#调用导入新增仓库数据方法
import_lg_entrepots
#导入新增商品分类数据方法
import_lg_item_cats
#导入新增订单数据
import_lg_orders
#导入新增订单仓库关联数据
import_lg_order_entrepot

#执行Hive修复分区命令
hive -e "alter table lg_ods.lg_orders add partition(dt='$do_date');
alter table lg_ods.lg_order_items add partition(dt='$do_date');
alter table lg_ods.lg_items add partition(dt='$do_date');
alter table lg_ods.lg_entrepots add partition(dt='$do_date');
alter table lg_ods.lg_item_cats add partition(dt='$do_date');
alter table lg_ods.lg_order_entrepot add partition(dt='$do_date');"

   验证导入结果
   select * from lg_ods.lg_orders where dt='20200918' limit 5;
   select * from lg_ods.lg_order_items where dt='20200918' limit 5;
   select * from lg_ods.lg_items where dt='20200918' limit 5;
   select * from lg_ods.lg_entrepots where dt='20200918' limit 5;
   select * from lg_ods.lg_item_cats where dt='20200918' limit 5;
   select * from lg_ods.lg_order_entrepot where dt='20200918' limit 5;
   
   
